package com.atguigu.dga.meta.service.impl;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SimplePropertyPreFilter;
import com.atguigu.dga.meta.bean.PageTableMetaInfo;
import com.atguigu.dga.meta.bean.TableMetaInfo;
import com.atguigu.dga.meta.mapper.TableMetaInfoMapper;
import com.atguigu.dga.meta.service.TableMetaInfoService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.thrift.TException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.util.List;
import java.util.stream.Collectors;

/**
 * <p>
 * 元数据表 服务实现类
 * </p>
 *
 * @author atguigu
 * @since 2023-10-27
 */
@Service
public class TableMetaInfoServiceImpl extends ServiceImpl<TableMetaInfoMapper, TableMetaInfo> implements TableMetaInfoService {

    @Autowired
    ApplicationContext context;
    /*
        幂等性： 一种操作，操作1次和N次的效果时一样的。

     */
    @Override
    public void initDBMetaInfo(String db, String assessDate) throws Exception {
        //1.考虑到写入的幂等性，在写入之前，先清理掉之前已经写入的数据  目标表的粒度：库-表-考评日期 是一行
        remove(new QueryWrapper<TableMetaInfo>().eq("schema_name",db).eq("assess_date",assessDate));
        //2.从hive中抽取相关库中所有表的元数据信息
        List<TableMetaInfo> tableMetaInfos = extractMetaInfoFromHive(db, assessDate);
        //3.读取HDFS，补充相关表的元数据信息
        extractMetaInfoFromHDFS(tableMetaInfos);
        //4.写入数据库的表中
        saveBatch(tableMetaInfos);
    }

    /*
        方式一.不需要编写额外的sql，只利用MybatisPlus提供单表查询的功能
                a)根据表名，库名查询到TableMetaInfo信息
                b)根据库名，表名，层级查询TableMetaInfoExtra
                c)把以上信息合并到PageTableMetaInfo
                向Mysql发送的请求多！
                    请求数= 2 * 符合条件的表数

        方式二： 只发一次请求，自己编写sql用join的方式。
                    自己写sql
     */
    @Override
    public List<PageTableMetaInfo> queryDataList(int from, Integer pageSize, String tableName, String schemaName, String dwLevel) {
        //自己编写sql查询
        return baseMapper.queryDataList(from,pageSize,tableName,schemaName,dwLevel);
    }

    @Override
    public int queryTotal(int from, Integer pageSize, String tableName, String schemaName, String dwLevel) {
        return baseMapper.queryTotal(from,pageSize,tableName,schemaName,dwLevel);
    }

    @Override
    public List<TableMetaInfo> getAllTableMetaInfo(String db, String assessDate) {
        return baseMapper.getAllTableMetaInfo(db,assessDate);
    }


    private List<TableMetaInfo> extractMetaInfoFromHDFS(List<TableMetaInfo> tableMetaInfos) throws Exception {
        //1.创建hdfs客户端，需要hdfs的uri，及管理员用户的名字
        FileSystem hdfs = context.getBean(FileSystem.class);
        //2.获取容量相关
        FsStatus hdfsStatus = hdfs.getStatus();
        long used = hdfsStatus.getUsed();
        long remaining = hdfsStatus.getRemaining();
        long capacity = hdfsStatus.getCapacity();
        //3.遍历每一张表，读取hdfs上的信息
        tableMetaInfos
            .stream()
                .forEach(tableMetaInfo -> {
                    tableMetaInfo.setFsCapcitySize(capacity);
                    tableMetaInfo.setFsRemainSize(remaining);
                    tableMetaInfo.setFsUsedSize(used);
                    try {
                        //根据表的目录获取表中所有的子文件的状态信息
                        FileStatus[] fileStatuses = hdfs.listStatus(new Path(tableMetaInfo.getTableFsPath()));
                        /*
                            编写方法获取 table_size,table_total_size, 累加运算
                            最后一次访问时间，最后一次修改时间： 以表目录中子文件最大的最后一次访问时间和修改时间为准。 最大值比较运算。
                                做运算，必须保证属性不能为null，如果为null会报错空指针异常。
                               重要: 需要为Bean的以上四个属性，赋初始值，目的为了避免做运算，报错空指针异常。

                             遍历文件夹，需要使用递归！
                         */
                        statTableMetaInfo(fileStatuses,hdfs,tableMetaInfo);
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });

        hdfs.close();
        return  tableMetaInfos;
    }

    /*
        TableMetaInfo tableMetaInfo: 表的元数据信息
        FileStatus[] fileStatuses： 表目录下第一级子目录的状态
        FileSystem hdfs： 客户端
     */
    private void statTableMetaInfo(FileStatus[] fileStatuses, FileSystem hdfs, TableMetaInfo tableMetaInfo) throws IOException {

        for (FileStatus fileStatus : fileStatuses) {

            //当前这个子文件是目录还是文件
            if (fileStatus.isFile()){
                //是文件，直接进行统计计算
                tableMetaInfo.setTableSize(tableMetaInfo.getTableSize() + fileStatus.getLen());
                tableMetaInfo.setTableTotalSize(tableMetaInfo.getTableTotalSize() + fileStatus.getLen() * fileStatus.getReplication());
                tableMetaInfo.getTableLastModifyTime().setTime(
                    Math.max(
                        tableMetaInfo.getTableLastModifyTime().getTime(),
                        fileStatus.getModificationTime()
                    )
                );
                tableMetaInfo.getTableLastAccessTime().setTime(
                    Math.max(
                        tableMetaInfo.getTableLastAccessTime().getTime(),
                        fileStatus.getAccessTime()
                    )
                );

            }else {
                //是目录 需要继续遍历目录
                FileStatus[] subFileStatus = hdfs.listStatus(fileStatus.getPath());
                //递归运算
                statTableMetaInfo(subFileStatus,hdfs,tableMetaInfo);
            }

        }

    }

    /*
        1.准备HiveMetaStoreClient
        2.查询库下所有的表
        3.为每张表都查询元数据，封装为Bean
     */
    private List<TableMetaInfo> extractMetaInfoFromHive(String db, String assessDate) throws MetaException {
        HiveMetaStoreClient client = context.getBean(HiveMetaStoreClient.class);
        //查询所有的表
        List<String> allTables = client.getAllTables(db);
        //在生成列的json时，只留下以下信息
        SimplePropertyPreFilter simplePropertyPreFilter = new SimplePropertyPreFilter("name", "type", "comment");
        //为每张表都查询元数据，封装为Bean
        List<TableMetaInfo> tableMetaInfos = allTables
            .stream()
            .map(name -> {
                try {
                    Table table = client.getTable(db, name);
                    TableMetaInfo tableMetaInfo = new TableMetaInfo();
                    //从Table中抽取相关的信息，赋值到TableMetaInfo中。
                    tableMetaInfo.setTableName(table.getTableName());
                    tableMetaInfo.setSchemaName(table.getDbName());
                    //大量的信息是在sd属性中
                    StorageDescriptor sd = table.getSd();
                    tableMetaInfo.setColNameJson(JSON.toJSONString(sd.getCols(),simplePropertyPreFilter));
                    tableMetaInfo.setPartitionColNameJson(JSON.toJSONString(table.getPartitionKeys(),simplePropertyPreFilter));
                    tableMetaInfo.setTableFsOwner(table.getOwner());
                    tableMetaInfo.setTableParametersJson(JSON.toJSONString(table.getParameters()));
                    tableMetaInfo.setTableComment(table.getParameters().get("comment"));
                    tableMetaInfo.setTableFsPath(sd.getLocation());
                    tableMetaInfo.setTableInputFormat(sd.getInputFormat());
                    tableMetaInfo.setTableOutputFormat(sd.getOutputFormat());
                    tableMetaInfo.setTableRowFormatSerde(sd.getSerdeInfo().getSerializationLib());
                    tableMetaInfo.setTableCreateTime(table.getCreateTime() + "");
                    tableMetaInfo.setTableType(table.getTableType());
                    tableMetaInfo.setTableBucketNum(sd.getNumBuckets() + 0l);
                    tableMetaInfo.setTableBucketColsJson(JSON.toJSONString(sd.getBucketCols(),simplePropertyPreFilter));
                    tableMetaInfo.setTableSortColsJson(JSON.toJSONString(sd.getSortCols(),simplePropertyPreFilter));

                    //添加考评日期
                    tableMetaInfo.setAssessDate(assessDate);
                    tableMetaInfo.setCreateTime(new Timestamp(System.currentTimeMillis()));
                    return tableMetaInfo;

                } catch (TException e) {
                    throw new RuntimeException(e);
                }
            })
            .collect(Collectors.toList());

        //关闭客户端
        client.close();
        return tableMetaInfos;
    }
}
